Análise de Carteira com Forecasting e Reinforcement Learning

Introdução

Esta análise tem como objetivo demonstrar um fluxo de trabalho para buscar dados de mercado de ações, realizar previsões de preços (forecasting) e treinar um agente de Reinforcement Learning (RL) para gerar sinais de compra e venda. Utilizaremos R para a coleta inicial de dados e Python (via reticulate) para a modelagem e visualização.

Nota: As previsões e sinais gerados são para fins demonstrativos e educacionais, não constituindo recomendação financeira.

1. Configuração do Ambiente

Primeiro, vamos carregar as bibliotecas R necessárias e configurar o reticulate para usar nosso ambiente Python.

Mostrar/Ocultar Código
# Bibliotecas R
library(tidyverse) # Para manipulação de dados e ggplot2
library(plotly)    # Para gráficos interativos (se for recriar em R)
library(reticulate)  # Para executar código Python
library(dplyr)     # Especificamente para a função de busca de dados
library(quantmod)  # Para buscar dados financeiros

Configuração do Python com reticulate

Certifique-se de que o ambiente Python que você especificar abaixo tenha todas as bibliotecas Python necessárias instaladas: yahooquery, gymnasium, torch, numpy, pandas, matplotlib, yfinance, plotly.

Mostrar/Ocultar Código
# Exemplo de como especificar um ambiente conda:
# use_condaenv("meu_ambiente_python", required = TRUE)

# Ou um ambiente virtual:
# use_virtualenv("caminho/para/meu_ambiente_virtual", required = TRUE)

# Ou especificar o executável Python diretamente:
# use_python("/usr/bin/python3", required = TRUE)

# Se as bibliotecas não estiverem instaladas, você pode tentar instalá-las via reticulate:
# py_install(c("yahooquery", "gymnasium", "torch", "numpy", "pandas", "matplotlib", "yfinance", "plotly"), pip = TRUE)

E o bloco de importações de bibliotecas também precisa estar dentro de um bloco de código delimitado corretamente:

#| label: python-library-imports
#| message: false
#| warning: false

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from yahooquery import Ticker
import yfinance as yf
from collections import deque

import gymnasium as gym
from gymnasium import spaces

import torch
import torch.nn as nn
import torch.optim as optim

from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px

import warnings
warnings.filterwarnings("ignore")

print("Bibliotecas Python importadas com sucesso.")

2. Aquisição de Dados de Preços

Utilizaremos um script R para buscar os preços de fechamento ajustados para os tickers selecionados e salvá-los em um arquivo CSV.

Mostrar/Ocultar Código
fetch_close_prices_qm <- function(tickers, start, end, cache_path = "prices_qm.csv") {
  # Se já existe CSV em cache, carrega e retorna
  if (file.exists(cache_path)) {
    df <- read.csv(cache_path, stringsAsFactors = FALSE) %>%
      mutate(date = as.Date(date))
    message("Dados carregados do cache: ", cache_path)
    return(df)
  }

  # Senão, faz o download para cada ticker
  all_data <- lapply(tickers, function(tk) {
    # getSymbols retorna um objeto xts com colunas Open, High, Low, Close, Volume, Adjusted
    xts_data <- tryCatch({
        getSymbols(tk, src = "yahoo", from = start, to = end, auto.assign = FALSE)
    }, error = function(e) {
        message(paste("Erro ao buscar dados para", tk, ":", e$message))
        return(NULL)
    })

    if (is.null(xts_data)) return(NULL)

    close_prices <- Ad(xts_data)  # usa Preço Ajustado (Adjusted Close)
    data.frame(
      date   = index(close_prices),
      ticker = tk,
      close  = as.numeric(close_prices),
      row.names = NULL
    )
  })

  # Remove NULLs (tickers com erro) e combina
  all_data <- all_data[!sapply(all_data, is.null)]
  if (length(all_data) == 0) {
    stop("Nenhum dado foi baixado para os tickers especificados.")
  }
  df <- bind_rows(all_data)

  # Salva em CSV para próximas execuções
  write.csv(df, cache_path, row.names = FALSE)
  message("Dados salvos no cache: ", cache_path)

  return(df)
}
Mostrar/Ocultar Código
tickers <- c("BRFS3.SA", "JBSS3.SA", "BEEF3.SA", "MRFG3.SA", "TSN", "HRL", "GIS")
start_date <- "2020-01-01" 
end_date <- format(Sys.Date(), "%Y-%m-%d") # Usar data atual para 'to'

df_prices_r <- fetch_close_prices_qm(tickers, start_date, end_date, cache_path = "prices_analise.csv") 
Dados carregados do cache: prices_analise.csv
Mostrar/Ocultar Código
tail(df_prices_r)
           date ticker close
9385 2025-05-08    GIS 54.71
9386 2025-05-09    GIS 54.50
9387 2025-05-12    GIS 54.84
9388 2025-05-13    GIS 53.77
9389 2025-05-14    GIS 53.28
9390 2025-05-15    GIS 54.40

3. Preparação e Análise Exploratória dos Dados (Python)

Carregamos os dados do CSV em um DataFrame pandas e o pivotamos para facilitar a análise por ticker.

Mostrar/Ocultar Código
import pandas as pd
# Carregar dados do CSV salvo pelo R
df_prices = pd.read_csv('prices_analise.csv', parse_dates=['date'])
print("Tail do df_prices carregado:")
Tail do df_prices carregado:
Mostrar/Ocultar Código
print(df_prices.tail())
           date ticker      close
9385 2025-05-09    GIS  54.500000
9386 2025-05-12    GIS  54.840000
9387 2025-05-13    GIS  53.770000
9388 2025-05-14    GIS  53.279999
9389 2025-05-15    GIS  54.400002
Mostrar/Ocultar Código
# Pivotear somente as colunas 'ticker' e 'close'
df_pivot = df_prices.pivot(index='date', columns='ticker', values='close')
df_pivot = df_pivot.reset_index() # Manter 'date' como coluna

print("\\nTail do df_pivot:")
\nTail do df_pivot:
Mostrar/Ocultar Código
print(df_pivot.tail())
ticker       date  BEEF3.SA   BRFS3.SA  ...   JBSS3.SA   MRFG3.SA        TSN
1380   2025-05-09      4.96  19.200001  ...  42.360001  19.850000  55.299999
1381   2025-05-12      5.07  19.709999  ...  41.889999  19.820000  55.990002
1382   2025-05-13      5.11  20.200001  ...  40.849998  20.090000  55.360001
1383   2025-05-14      5.15  19.680000  ...  39.349998  19.799999  54.500000
1384   2025-05-15      5.14  20.620001  ...  39.180000  20.660000  55.650002

[5 rows x 8 columns]

4. Forecasting de Preços (Python com Plotly)

Realizamos uma simulação simples de forecasting baseada na média e desvio padrão dos retornos logarítmicos históricos.

Mostrar/Ocultar Código
import numpy as np
import pandas as pd 
import plotly.express as px
# Defina a data de corte e o período do forecast
# Usar a data mais recente do df_pivot como CUT
CUT = df_pivot["date"].max()
forecast_days = 30
future_dates = pd.date_range(CUT + pd.Timedelta(days=1), periods=forecast_days, freq="D")

# Lista de ativos (tickers)
assets = df_pivot.columns[1:]  # Ignorando a coluna 'date'

# Lista para armazenar os dados de forecast
forecast_data = []

# Gera previsões para cada ativo (simulação simples)
for asset in assets:
    # Pega os dados históricos até a data de corte
    df_asset_hist = df_pivot[["date", asset]].copy() # Usar .copy() para evitar SettingWithCopyWarning
    df_asset_hist = df_asset_hist[df_asset_hist["date"] <= CUT]
    df_asset_hist.dropna(subset=[asset], inplace=True) # Remover NaNs que podem atrapalhar pct_change

    if len(df_asset_hist) < 2: # Precisa de pelo menos 2 pontos para pct_change
        print(f"Dados insuficientes para forecasting do ativo: {asset}")
        continue

    # Calcula a média e desvio padrão dos retornos históricos
    df_asset_hist["logret"] = df_asset_hist[asset].pct_change()
    # Remover o primeiro NaN de logret e quaisquer outros NaNs/infs
    df_asset_hist.replace([np.inf, -np.inf], np.nan, inplace=True)
    df_asset_hist.dropna(subset=['logret'], inplace=True)

    if df_asset_hist["logret"].empty:
        print(f"Não foi possível calcular retornos para o ativo: {asset}")
        mu = 0 # Default mu
        sigma = 0.01 # Default sigma para evitar erro com scale=0
    else:
        mu = df_asset_hist["logret"].mean()
        sigma = df_asset_hist["logret"].std()
        if pd.isna(sigma) or sigma == 0: # Adiciona uma pequena volatilidade se std for 0 ou NaN
            sigma = 0.01 

    # Simula os retornos futuros
    simulated_logrets = np.random.normal(loc=mu, scale=sigma, size=forecast_days)
    last_price = df_asset_hist[asset].iloc[-1]
    if pd.isna(last_price): # Se o último preço for NaN, use um preço padrão ou pule
        print(f"Último preço é NaN para o ativo: {asset}. Pulando forecast.")
        continue
        
    simulated_prices = last_price * (1 + simulated_logrets).cumprod()

    # Adiciona os dados de forecast
    for date_val, value in zip(future_dates, simulated_prices):
        forecast_data.append({
            "date": date_val,
            "asset": asset,
            "price": value,
            "rep": "Forecast"
        })

df_forecast = pd.DataFrame(forecast_data)

# Prepara o histórico para plotar junto, filtrando até a data de corte
hist_data = df_pivot[df_pivot["date"] <= CUT].copy()
hist_data = hist_data.melt(id_vars="date", var_name="asset", value_name="price")
hist_data["rep"] = "Histórico"

# Junta histórico e forecast
df_plot = pd.concat([hist_data, df_forecast], ignore_index=True)

# Filtra os dados para mostrar apenas o período relevante (últimos N dias de histórico + forecast)
# Por exemplo, últimos 60 dias de histórico + 30 dias de forecast
start_plot_date = CUT - pd.Timedelta(days=60)
end_plot_date = CUT + pd.Timedelta(days=forecast_days)

df_plot_filtered = df_plot[(df_plot["date"] >= start_plot_date) & (df_plot["date"] <= end_plot_date)]

if not df_plot_filtered.empty:
    fig_forecast = px.line(
        df_plot_filtered,
        x="date",
        y="price",
        color="rep",
        facet_col="asset",
        facet_col_wrap=2, # Ajuste conforme o número de tickers
        labels={"date": "Data", "price": "Preço (Moeda Local/USD)", "rep": "Série"},
        title=f"Forecasting de Preços ({forecast_days} dias) a partir de {CUT.strftime('%Y-%m-%d')}"
    )
    fig_forecast.update_layout(width=1000, height=300 * (len(assets)//2 + len(assets)%2)) # Ajusta altura
    fig_forecast.update_xaxes(matches=None, nticks=5)
    fig_forecast.show()
else:
    print("Nenhum dado para plotar no gráfico de forecast.")

Forecasting de Preços para os Tickers da Carteira (Próximos 30 dias)

5. Reinforcement Learning para Sinais de Trading

5.1. Definição do Agente e Funções Auxiliares

Definimos a função getState e a classe Agent que representa nosso agente de RL.

Mostrar/Ocultar Código
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
def getState(data, t, window_size):
    """
    Converte uma janela de preços em vetor de retornos normalizados.
    """
    d = t - window_size + 1
    block = data[d:t+1] if d >= 0 else -d * [data[0]] + list(data[0:t+1])
    # Evitar divisão por zero se block[i] for 0
    res = []
    for i in range(len(block)-1):
        if block[i] != 0:
            res.append((block[i+1] - block[i]) / block[i])
        else:
            res.append(0) # Retorno zero se o preço base for zero
    return np.array(res, dtype=np.float32)

class Agent(nn.Module):
    def __init__(
        self,
        state_size,
        hidden_size=64,
        lr=1e-4,
        gamma=0.95,
        epsilon=1.0,
        epsilon_min=0.01,
        epsilon_decay=0.995
    ):
        super(Agent, self).__init__()
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.inventory = [] # Adicionado para manter o inventário do agente
        
        self.model = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 3)  # Q para 3 ações: 0=HOLD, 1=BUY, 2=SELL
        )
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.criterion = nn.MSELoss()

    def act(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.choice([0,1,2]) # 0: HOLD, 1: BUY, 2: SELL
        state_t = torch.from_numpy(state).unsqueeze(0)
        q_values = self.model(state_t).detach().numpy()[0]
        return np.argmax(q_values)

    def train_step(self, state, action, reward, next_state, done): # Adicionado 'done'
        state_t = torch.from_numpy(state).unsqueeze(0)
        next_t = torch.from_numpy(next_state).unsqueeze(0)
        
        q_values = self.model(state_t)
        
        with torch.no_grad():
            q_next = self.model(next_t).max(1)[0]
            if done: # Se for o estado terminal, o valor do próximo estado é 0
                 target_q_value = reward
            else:
                 target_q_value = reward + self.gamma * q_next

        target = q_values.clone().detach()
        target[0, action] = target_q_value
        
        loss = self.criterion(q_values, target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

print("Definições do Agente RL carregadas.")
Definições do Agente RL carregadas.

5.2. Treinamento do Agente RL

Treinamos o agente para cada ticker da nossa lista.

Mostrar/Ocultar Código
import numpy as np
import torch
import torch.nn as nn
# Parâmetros de treinamento
window_size = 10  # Deve ser state_size - 1 se getState retorna len(block)-1
episodes    = 50 # Reduzido para demonstração rápida, pode aumentar para melhor performance
rl_results  = {}

# Tickers para o treinamento (obtidos do df_pivot)
# A primeira coluna é 'date', então pegamos da segunda em diante
train_tickers = df_pivot.columns[1:].tolist() 

for tk in train_tickers:
    print(f"\\n=== Treinando para {tk} ===")
    
    # Prepara série de preços para o ticker
    prices = df_prices[df_prices['ticker'] == tk].sort_values("date")['close'].values
    prices = prices[~np.isnan(prices)] # Remover NaNs dos preços

    if len(prices) < window_size + 2: # Checagem mais robusta para dados suficientes
        print(f"Dados insuficientes para {tk} após remover NaNs. Pulando ticker.")
        rl_results[tk] = [0] * episodes # Adiciona placeholder para evitar erro no plot
        continue

    # O state_size é o tamanho da saída de getState, que é window_size
    agent = Agent(state_size=window_size) 
    total_profits_tk = []

    for e in range(episodes):
        state = getState(prices, 0, window_size + 1) # getState espera window_size + 1 para gerar 'window_size' retornos
        agent.inventory = []
        total_profit   = 0.0 # Inicializar como float

        for t in range(len(prices)-1): # Loop até o penúltimo preço
            action     = agent.act(state)
            # O next_state é para o tempo t+1, então o último t+1 será len(prices)-1
            next_state = getState(prices, t + 1, window_size + 1)
            reward     = 0.0 # Inicializar como float
            done = (t == len(prices) - 2) # 'done' é true no último passo

            # Executa ação: BUY, SELL ou HOLD
            if action == 1:  # BUY
                agent.inventory.append(prices[t])
            elif action == 2 and agent.inventory:  # SELL
                bought_price = agent.inventory.pop(0)
                profit       = prices[t] - bought_price
                if bought_price != 0: # Evitar divisão por zero
                    reward = profit / bought_price
                else:
                    reward = 0.0
                total_profit += profit
            
            agent.train_step(state, action, reward, next_state, done)
            state = next_state

        total_profits_tk.append(total_profit)
        if (e+1) % 10 == 0 or e == episodes -1 : # Imprimir a cada 10 episódios e no último
            print(f"Episódio {e+1}/{episodes} — Lucro: {total_profit:.2f}")
    
    rl_results[tk] = total_profits_tk
\n=== Treinando para BEEF3.SA ===
Episódio 10/50 — Lucro: -8.68
Episódio 20/50 — Lucro: -9.37
Episódio 30/50 — Lucro: -14.95
Episódio 40/50 — Lucro: -1.96
Episódio 50/50 — Lucro: -21.28
\n=== Treinando para BRFS3.SA ===
Episódio 10/50 — Lucro: -39.67
Episódio 20/50 — Lucro: -33.95
Episódio 30/50 — Lucro: -31.54
Episódio 40/50 — Lucro: 27.04
Episódio 50/50 — Lucro: -50.85
\n=== Treinando para GIS ===
Episódio 10/50 — Lucro: 5123.43
Episódio 20/50 — Lucro: 154.56
Episódio 30/50 — Lucro: 359.13
Episódio 40/50 — Lucro: 515.71
Episódio 50/50 — Lucro: 406.98
\n=== Treinando para HRL ===
Episódio 10/50 — Lucro: -26.49
Episódio 20/50 — Lucro: -7.75
Episódio 30/50 — Lucro: -5.98
Episódio 40/50 — Lucro: 2.68
Episódio 50/50 — Lucro: -7.12
\n=== Treinando para JBSS3.SA ===
Episódio 10/50 — Lucro: -0.13
Episódio 20/50 — Lucro: 21.78
Episódio 30/50 — Lucro: 22.84
Episódio 40/50 — Lucro: 81.23
Episódio 50/50 — Lucro: 21.15
\n=== Treinando para MRFG3.SA ===
Episódio 10/50 — Lucro: 5.80
Episódio 20/50 — Lucro: -0.15
Episódio 30/50 — Lucro: 389.08
Episódio 40/50 — Lucro: 433.31
Episódio 50/50 — Lucro: 15.82
\n=== Treinando para TSN ===
Episódio 10/50 — Lucro: 62.27
Episódio 20/50 — Lucro: 31.34
Episódio 30/50 — Lucro: -20.24
Episódio 40/50 — Lucro: -58.33
Episódio 50/50 — Lucro: 8.97
Mostrar/Ocultar Código
# Plot da evolução do lucro
if rl_results: # Apenas plotar se houver resultados
    df_hist_profit = pd.DataFrame(rl_results)
    # Adicionar coluna 'Episódio' se o índice não for usado diretamente
    if not isinstance(df_hist_profit.index, pd.RangeIndex) or df_hist_profit.index.name != 'Episódio':
        df_hist_profit = df_hist_profit.reset_index().rename(columns={'index': 'Episódio'})
        # Se o índice já é RangeIndex (0 a N-1), apenas nomeie-o ou use-o diretamente
    elif df_hist_profit.index.name != 'Episódio':
         df_hist_profit.index.name = 'Episódio'
         df_hist_profit = df_hist_profit.reset_index()

    df_melt_profit = df_hist_profit.melt(
        id_vars='Episódio',
        var_name='ticker',
        value_name='Lucro'
    )

    fig_profit_evol = px.line(
        df_melt_profit,
        x='Episódio',
        y='Lucro',
        color='ticker',
        title='Evolução do Lucro Total por Episódio (Treinamento RL)'
    )
    fig_profit_evol.update_layout(
        xaxis_title='Episódio',
        yaxis_title='Lucro Total (Moeda Local/USD)'
    )
    fig_profit_evol.show()
else:
    print("Nenhum resultado de treinamento RL para plotar.")

Evolução do Lucro Total por Episódio Durante o Treinamento do Agente RL

5.3. Geração de Sinais de Trading e Visualização

Após o treinamento, usamos o agente para gerar sinais de COMPRA/VENDA e os visualizamos.

Mostrar/Ocultar Código
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots # Importe make_subplots aqui
import torch
import torch.nn as nn
# 1) Gera sinais para cada ticker
all_signals = {}
# Usar a última instância do agente treinada ou treinar um novo/carregar
# Para este exemplo, vamos reusar a última instância 'agent' do loop de treinamento,
# que foi treinada no último ticker da lista 'train_tickers'.
# Idealmente, você teria um agente treinado por ticker ou um agente geral.
# Aqui, vamos gerar sinais para todos os tickers usando o agente treinado no ÚLTIMO ticker.
# Isto é mais para demonstração da plotagem.
# Para uma análise real, você deveria ter um agente específico por ticker ou um agente treinado em todos.

# Se 'agent' não foi definido (ex: todos os tickers foram pulados no treinamento)
if 'agent' not in locals() and train_tickers:
    print("Agente não treinado. Treinando um agente no primeiro ticker disponível para demonstração de sinais.")
    tk_demo = train_tickers[0]
    prices_demo = df_prices[df_prices['ticker'] == tk_demo].sort_values("date")['close'].values
    prices_demo = prices_demo[~np.isnan(prices_demo)]
    if len(prices_demo) >= window_size + 2:
        agent = Agent(state_size=window_size)
        # Treinamento rápido apenas para ter um agente
        for e_demo in range(5): # Treino muito curto
            state_demo = getState(prices_demo, 0, window_size + 1)
            for t_demo in range(len(prices_demo) -1):
                action_demo = agent.act(state_demo)
                next_state_demo = getState(prices_demo, t_demo + 1, window_size + 1)
                # Recompensa e 'done' simplificados para este agente de demonstração
                agent.train_step(state_demo, action_demo, 0, next_state_demo, (t_demo == len(prices_demo) - 2))
                state_demo = next_state_demo
    else:
        agent = None # Não foi possível treinar agente de demonstração
        print(f"Não foi possível treinar agente de demonstração para {tk_demo}")


if agent: # Prossiga apenas se o agente existir
    for tk_signal in train_tickers: # Usar train_tickers para consistência
        agent.epsilon = agent.epsilon_min # Usar política greedy para geração de sinais
        
        current_prices_tk = df_prices[df_prices.ticker==tk_signal].sort_values('date')
        
        if current_prices_tk.empty or 'close' not in current_prices_tk.columns:
            print(f"Aviso: Nenhum dado de preço para {tk_signal} na geração de sinais. Pulando.")
            all_signals[tk_signal] = pd.DataFrame(columns=['date', 'action', 'price'])
            continue

        dates_signal  = current_prices_tk['date'].values
        values_signal = current_prices_tk['close'].values
        values_signal = values_signal[~np.isnan(values_signal)] # Remover NaNs

        if len(values_signal) < window_size + 2:
            print(f"Dados insuficientes para {tk_signal} na geração de sinais após remover NaNs. Pulando.")
            all_signals[tk_signal] = pd.DataFrame(columns=['date', 'action', 'price'])
            continue
            
        state_signal = getState(values_signal, 0, window_size+1)
        agent.inventory = [] # Resetar inventário para cada ticker
        signals_current_tk = []

        for t_signal in range(len(values_signal)-1):
            action_signal = agent.act(state_signal)
            date_val  = dates_signal[t_signal]
            price_val = values_signal[t_signal]
            
            if action_signal == 1: # BUY
                signals_current_tk.append({'date': date_val, 'action': 'BUY',  'price': price_val})
                agent.inventory.append(price_val)
            elif action_signal == 2 and agent.inventory: # SELL
                signals_current_tk.append({'date': date_val, 'action': 'SELL', 'price': price_val})
                agent.inventory.pop(0)
            
            next_state_signal = getState(values_signal, t_signal+1, window_size+1)
            state_signal = next_state_signal

        if signals_current_tk:
            all_signals[tk_signal] = pd.DataFrame(signals_current_tk)
        else:
            all_signals[tk_signal] = pd.DataFrame(columns=['date', 'action', 'price'])
else:
    print("Agente RL não está definido. Pulando geração e visualização de sinais.")
    all_signals = {tk: pd.DataFrame(columns=['date', 'action', 'price']) for tk in train_tickers}
np.float64(9.65421581268311)
np.float64(5.58508443832397)
np.float64(6.22337961196899)
np.float64(6.54252672195435)
np.float64(6.53454780578613)
np.float64(10.2605962753296)
np.float64(10.7074041366577)
np.float64(11.2419757843018)
np.float64(8.19039058685303)
np.float64(7.68423557281494)
np.float64(8.23179340362549)
np.float64(12.048752784729)
np.float64(12.993390083313)
np.float64(14.7344846725464)
np.float64(13.7713260650635)
np.float64(16.1143970489502)
np.float64(21.3562049865723)
np.float64(20.9857597351074)
np.float64(20.0133380889893)
np.float64(17.2535171508789)
np.float64(45.6658248901367)
np.float64(48.5672416687012)
np.float64(45.4976196289062)
np.float64(47.2843551635742)
np.float64(78.1774749755859)
np.float64(78.542366027832)
np.float64(38.0741767883301)
np.float64(41.9598236083984)
np.float64(40.2908248901367)
np.float64(42.8776893615723)
np.float64(43.0357131958008)
np.float64(12.1137619018555)
np.float64(12.6148347854614)
np.float64(14.3685894012451)
np.float64(14.1451368331909)
np.float64(14.090968132019)
np.float64(9.03977966308594)
np.float64(9.05292797088623)
np.float64(4.9833836555481)
np.float64(5.193763256073)
np.float64(5.193763256073)
np.float64(5.45016479492188)
np.float64(5.60137557983398)
np.float64(5.81832981109619)
np.float64(49.8331680297852)
np.float64(54.6694068908691)
np.float64(50.2830467224121)
np.float64(56.8139953613281)
np.float64(71.6482849121094)
np.float64(89.2217330932617)
np.float64(89.3728256225586)
Mostrar/Ocultar Código
# 2) Cria figura com uma linha por ticker
if train_tickers and all_signals : # Apenas se houver tickers e sinais
    fig_signals = make_subplots(
        rows=len(train_tickers), cols=1,
        shared_xaxes=True,
        subplot_titles=train_tickers,
        vertical_spacing=0.02
    )

    for i, tk_plot in enumerate(train_tickers, start=1):
        prices_tk_plot = df_prices[df_prices.ticker==tk_plot].sort_values('date')
        sig_df_plot = all_signals.get(tk_plot, pd.DataFrame(columns=['date', 'action', 'price']))

        if not prices_tk_plot.empty and 'close' in prices_tk_plot.columns:
            fig_signals.add_trace(
                go.Scatter(x=prices_tk_plot['date'], y=prices_tk_plot['close'], mode='lines', name=f'Preço {tk_plot}', legendgroup=f'group{tk_plot}'),
                row=i, col=1
            )
        
        buy_signals_plot = sig_df_plot.query("action=='BUY'")
        if not buy_signals_plot.empty:
            fig_signals.add_trace(
                go.Scatter(x=buy_signals_plot['date'],
                           y=buy_signals_plot['price'],
                           mode='markers', marker_symbol='triangle-up',
                           marker_size=8, marker_color='green', 
                           name=f'Compra', showlegend=(i==1), legendgroup=f'group_buy'), # Mostrar legenda apenas uma vez
                row=i, col=1
            )
        
        sell_signals_plot = sig_df_plot.query("action=='SELL'")
        if not sell_signals_plot.empty:
            fig_signals.add_trace(
                go.Scatter(x=sell_signals_plot['date'],
                           y=sell_signals_plot['price'],
                           mode='markers', marker_symbol='triangle-down',
                           marker_size=8, marker_color='red',
                           name=f'Venda', showlegend=(i==1), legendgroup=f'group_sell'), # Mostrar legenda apenas uma vez
                row=i, col=1
            )

    fig_signals.update_layout(
        height=max(300 * len(train_tickers), 800), # Ajusta altura dinamicamente, mínimo de 800px
        title_text='Sinais de Compra/Venda por Ticker (Agente RL)',
        legend_tracegroupgap = 180 # Espaçamento entre grupos de legenda
    )
    fig_signals.update_yaxes(title_text="Preço") 
    # Aplicar título do eixo X apenas ao último subplot visível
    # Encontrar o último subplot que realmente tem dados para o eixo X
    last_row_with_data = 0
    for r in range(len(train_tickers), 0, -1):
        if not df_prices[df_prices.ticker==train_tickers[r-1]].empty:
            last_row_with_data = r
            break
    if last_row_with_data > 0:
      fig_signals.update_xaxes(title_text="Data", row=last_row_with_data, col=1)
    
    fig_signals.show()
else:
    print("Nenhum ticker ou sinal para plotar.")

Sinais de Compra/Venda Gerados pelo Agente RL por Ticker

6. Conclusão

Este documento demonstrou um pipeline para análise de dados financeiros, incluindo coleta de dados, forecasting e a aplicação de um agente de Reinforcement Learning para gerar sinais de trading. Os resultados visuais do forecasting e dos sinais do agente RL fornecem insights que podem auxiliar na tomada de decisões de investimento, lembrando sempre da importância de análises complementares e do gerenciamento de risco.

Os gráficos de evolução do lucro durante o treinamento do agente RL indicam a capacidade de aprendizado do modelo em diferentes ativos, embora a performance possa variar significativamente. A visualização final dos sinais de compra e venda sobrepostos aos preços históricos permite uma avaliação qualitativa da estratégia do agente.